-
Notifications
You must be signed in to change notification settings - Fork 110
Fix dropped errors in CompletionStage's *EitherAsync methods
#252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix dropped errors in CompletionStage's *EitherAsync methods
#252
Conversation
|
@cosineblast Since you're the original author of the |
4256055 to
7980c99
Compare
| (toCompletableFuture [d] | ||
| (to-completable-future d)) | ||
| (then-exceptionally d f nil)) | ||
| ;; Only available since Java 12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some kind of conditional to add these so they won't get forgotten?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a way to implement methods conditionally depending on the Java version. Are you aware of one? I mainly put these in for completion's sake (no pun intended 😄) so that we could uncomment them in the future when we decide to drop support for Java versions < 12 (which Clojure itself likely will do from 1.13 on).
| (defn- then-compose [d ^Function f] | ||
| (defn- then-compose [d ^Function f result] | ||
| (assert-some f) | ||
| (let [d' (deferred)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still necessary? Shouldn't the provided target (result) be used instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit subtle indeed 😅 The purpose of d' is to unwrap the deferred returned from f (= fd), so AFAICT we still need it. If it helps: thenCompose is basically the bind / flatMap operation of the CompletionStage monad 😬
src/manifold/deferred.clj
Outdated
| #(success! d' %) | ||
| #(error! d' %))) | ||
| (fn [error] (error! d' error))) | ||
| (-> (completion-stage-result d #(->deferred (.apply f %)) result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this ->deferred be run on the default executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above - same applies here, of course.
| Unlike `chain`, this function does not unwrap the result of `f`; it will only be applied to the | ||
| immediate value of `d`. This is for mimicking `CompletionStage`'s behavior." | ||
| [d f] | ||
| (let [d' (deferred)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this deferred run on the default executor regardless of where d is getting executed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the executor of d' will be nil which means its callbacks will be invoked on the same executor / thread which realizes it. So basically, d' inherits the executor of d here. This behavior isn't documented very well, alas. The best I could find is this paragraph:
Line 13 in d772ebf
| In these cases, we can move the stream or deferred `onto` an executor, guaranteeing that all actions resulting from an operation will be enqueued onto a thread pool rather than immediately executed. This executor can be generated via `manifold.executor/instrumented-executor`, or using the convenience methods `fixed-thread-executor` and `utilization-executor`. In addition to providing automatic instrumentation, these executors will guarantee that any streams or deferred created within their scope will also be "on" that executor. For this reason, it's often sufficient to only call `onto` on a single stream in a topology, as everything downstream of it will transitively be executed on the executor. |
The implementation lives here:
manifold/src/manifold/deferred.clj
Lines 405 to 420 in d772ebf
| (clojure.core/loop [] | |
| (when-let [^IDeferredListener l# (.poll ~'listeners)] | |
| (try | |
| (if (nil? ~executor) | |
| (~(if success? `.onSuccess `.onError) ^IDeferredListener l# ~val) | |
| (.execute ~(with-meta executor {:tag "java.util.concurrent.Executor"}) | |
| (fn [] | |
| (try | |
| (~(if success? `.onSuccess `.onError) l# ~val) | |
| (catch Throwable e# | |
| #_(.printStackTrace e#) | |
| (log/error e# "error in deferred handler")))))) | |
| (catch Throwable e# | |
| #_(.printStackTrace e#) | |
| (log/error e# "error in deferred handler"))) | |
| (recur))) |
And finally, here's an example which empirically confirms this behavior:
(let [ex (ex/fixed-thread-executor
1
{:thread-factory (ex/thread-factory
(constantly "my executor")
(deliver (promise) nil))})
d (deferred ex)
d' (shallow-chain d #(prn :one % (.getName (Thread/currentThread))))]
(chain d' #(prn :two % (.getName (Thread/currentThread))))
(success! d :ok))Output:
:one :ok "my executor"
:two nil "my executor"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
every day i learn something new
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here! 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very well pointed! I should probably have documented this detail with a comment in the original implementation, but I'm glad you've written it out explicitly
7980c99 to
b31b83a
Compare
|
Thanks for your review and approval @valerauko! Let's also wait for @cosineblast's approval before merging. |
For `*Async` variants of `CompletionStage` methods, the (first) deferred argument was wrapped with
`onto` so that its callbacks are invoked on the given `executor`. This results in a new deferred
connected to the original one. In case of the `*EitherAsync` methods, that new deferred would then
leak when `alt` chooses the `other` one but the `onto` deferred ends up in an error state since
there was no way for the caller to attach an error handler to it.
Furthermore, for async variants of methods accepting two completion stages, `f` was never executed
on the given executor. For example, `then-combine` was defined like this:
(defn- then-combine [d other ^BiFunction f]
(assert-some other f)
(fmap-deferred (zip d other)
(fn [[x y]] (.apply f x y))))
Now `then-combine-async` (defined via `def-async-for-dual`) would wrap `d` with `(onto d executor)`
before calling `then-combine`. However, the deferred passed to `fmap-deferred` was the one returned
from `(zip d other)` which would not have the given executor attached. Thus, `f` would have been
executed on a different executor than desired.
This patch addresses both issues by adding a `to` deferred as the final argument to every method
implementation. This deferred is bound to the desired `executor`. The operation `f` is then attached
as a callback to that `to` deferred and the deferred of the main operation is connected to it. For
synchronous implementations, `nil` can be passed as `to` in which case the callback is attached
directly to the main operation deferred to save some overhead. This means that now the original
deferred is passed to `alt` so that all error states can be handled by the caller.
It also removes the `def-async-for` and `def-async-for-dual` macros and opts to instead always
explicitly pass the `to` deferred in all method implementations. This isn't much more verbose but
hopefully serves to make the code bit easier to follow.
Finally, it renames `fmap-deferred` to `shallow-chain` for consistency with the established `chain`
naming and introduces the analogous `shallow-connect` (also private for now).
b31b83a to
1a2fc56
Compare
src/manifold/deferred.clj
Outdated
| (defn- then-apply [d ^Function f] | ||
| (assert-some f) | ||
| (fmap-deferred d #(.apply f %))) | ||
| (defn- completion-stage-result [d f to] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: I'm all in for dropping the haskell-like fmap terminology and going for more familiar manifold terms (I loved your decision with shallow-chain), but can't we name this completion-stage-result function to something more meaningful/specifc?
Perhaps shallow-chain-with or shallow-chain-into? I don't think it deserves a documentation string since its implementation is very simple and shallow-chain is well explained (thanks for re-wording its docs btw), but I think this code would benefit from a better name for this, since it's used throughout the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: I'm all in for dropping the haskell-like
fmapterminology and going for more familiar manifold terms (I loved your decision withshallow-chain),
Thanks, glad you like it, too 😊 Always feels a bit intrusive to rename functions like that but harmonizing it with the existing manifold terminology seemed prudent.
but can't we name this
completion-stage-resultfunction to something more meaningful/specifc?
Ah yes, thanks for noticing! I'm not too happy with that name myself but forgot to mention it in the description.
Perhaps
shallow-chain-withorshallow-chain-into? I don't think it deserves a documentation string since its implementation is very simple andshallow-chainis well explained (thanks for re-wording its docs btw), but I think this code would benefit from a better name for this, since it's used throughout the implementation.
Good ideas. Perhaps even shallow-chain-onto to bring the onto back in? Looking at it now, we could actually change the signature to accept an executor instead of a deferred, making the onto name even more fitting. Originally, I was passing in a deferred even in the synchronous case but as you can see, I have since refactored it so that we're passing nil instead now. Will give this a try!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed 9fb23ad as a fixup commit (will squash before merge later). I realized that I could further decompose it so that we now also have shallow-onto 😄 WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, this is much better, lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm much happier with this now, too. Are you still reviewing? Otherwise, would you be so kind to hit "approve" on the PR for posterity? 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot!
| (-> (completion-stage-result d #(->deferred (.apply f %)) to) | ||
| (on-realized (fn [fd] | ||
| (shallow-connect fd d')) | ||
| (fn [error] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I loved the join . fmap implementation for then-compose using shallow-connect here, well done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks 🙏 Though I have to admit that I'm not fluent in Monadic, so I am not familiar with the join . fmap idiom. The code here is just the nicest way I could come up with to express this operation 😅
For
*Asyncvariants ofCompletionStagemethods, the (first) deferred argument was wrapped withontoso that its callbacks are invoked on the givenexecutor. This results in a new deferred connected to the original one. In case of the*EitherAsyncmethods, that new deferred would then leak whenaltchooses theotherone but theontodeferred ends up in an error state since there was no way for the caller to attach an error handler to it.Furthermore, for async variants of methods accepting two completion stages,
fwas never executed on the given executor. For example,then-combinewas defined like this:Now
then-combine-async(defined viadef-async-for-dual) would wrapdwith(onto d executor)before callingthen-combine. However, the deferred passed tofmap-deferredwas the one returned from(zip d other)which would not have the given executor attached. Thus,fwould have been executed on a different executor than desired.This patch addresses both issues by adding a
todeferred as the final argument to every method implementation. This deferred is bound to the desiredexecutor. The operationfis then attached as a callback to thattodeferred and the deferred of the main operation is connected to it. For synchronous implementations,nilcan be passed astoin which case the callback is attached directly to the main operation deferred to save some overhead. This means that now the original deferred is passed toaltso that all error states can be handled by the caller.It also removes the
def-async-foranddef-async-for-dualmacros and opts to instead always explicitly pass thetodeferred in all method implementations. This isn't much more verbose but hopefully serves to make the code bit easier to follow.Finally, it renames
fmap-deferredtoshallow-chainfor consistency with the establishedchainnaming and introduces the analogousshallow-connect(also private for now).Prompted by these test failures.
Based on #245
TODO
executorinstead oftodeferred being passed to method implementations)